线程池原理

为什么要使用线程池

使用线程池主要有以下三个原因:

  1. 创建/销毁线程需要消耗系统资源,线程池可以复用已创建的线程
  2. 控制并发的数量。并发数量过多,可能会导致资源消耗过多,从而造成服务器崩溃。(主要原因)
  3. 可以对线程做统一管理

线程池的原理

Java中的线程池顶层接口是Executor接口,ThreadPoolExecutor是这个接口的实现类。

我们先看看ThreadPoolExecutor类。

ThreadPoolExecutor提供的构造方法

一共有四个构造方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)

// 六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)

// 六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

// 七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)

涉及到5~7个参数,我们先看看必须的5个参数是什么意思:

  • int corePoolSize:该线程池中核心线程数最大值

    核心线程:线程池中有两类线程,核心线程和非核心线程。核心线程默认情况下会一直存在于线程池中,即使这个核心线程什么都不干(铁饭碗),而非核心线程如果长时间的闲置,就会被销毁(临时工)。

  • int maximumPoolSize:该线程池中线程总数最大值

    该值等于核心线程数量 + 非核心线程数量。

  • long keepAliveTime非核心线程闲置超时时长

    非核心线程如果处于闲置状态超过该值,就会被销毁。如果设置allowCoreThreadTimeOut(true),则会也作用于核心线程。

  • TimeUnit unit:keepAliveTime的单位。

TimeUnit是一个枚举类型 ,包括以下属性:

NANOSECONDS : 1微毫秒 = 1微秒 / 1000 MICROSECONDS : 1微秒 = 1毫秒 / 1000 MILLISECONDS : 1毫秒 = 1秒 /1000 SECONDS : 秒 MINUTES : 分 HOURS : 小时 DAYS : 天

  • BlockingQueue workQueue:阻塞队列,维护着等待执行的Runnable任务对象

    常用的几个阻塞队列:

    1. LinkedBlockingQueue

      链式阻塞队列,底层数据结构是链表,默认大小是Integer.MAX_VALUE,也可以指定大小。

    2. ArrayBlockingQueue

      数组阻塞队列,底层数据结构是数组,需要指定队列的大小。

    3. SynchronousQueue

      同步队列,内部容量为0,每个put操作必须等待一个take操作,反之亦然。

    4. DelayQueue

      延迟队列,该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。

我们将在下一章中重点介绍各种阻塞队列

好了,介绍完5个必须的参数之后,还有两个非必须的参数。

  • ThreadFactory threadFactory

    创建线程的工厂 ,用于批量创建线程,统一在创建线程时设置一些参数,如是否守护线程、线程的优先级等。如果不指定,会新建一个默认的线程工厂。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static class DefaultThreadFactory implements ThreadFactory {
// 省略属性
// 构造函数
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

// 省略
}
  • RejectedExecutionHandler handler

    拒绝处理策略,线程数量大于最大线程数就会采用拒绝处理策略,四种拒绝处理的策略为 :

    1. ThreadPoolExecutor.AbortPolicy默认拒绝处理策略,丢弃任务并抛出RejectedExecutionException异常。
    2. ThreadPoolExecutor.DiscardPolicy:丢弃新来的任务,但是不抛出异常。
    3. ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列头部(最旧的)的任务,然后重新尝试执行程序(如果再次失败,重复此过程)。
    4. ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务。

ThreadPoolExecutor的策略

线程池本身有一个调度线程,这个线程就是用于管理布控整个线程池里的各种任务和事务,例如创建线程、销毁线程、任务队列管理、线程队列管理等等。

故线程池也有自己的状态。ThreadPoolExecutor类中使用了一些final int常量变量来表示线程池的状态 ,分别为RUNNING、SHUTDOWN、STOP、TIDYING 、TERMINATED。

1
2
3
4
5
6
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
  • 线程池创建后处于RUNNING状态。

  • 调用shutdown()方法后处于SHUTDOWN状态,线程池不能接受新的任务,清除一些空闲worker,会等待阻塞队列的任务完成。

  • 调用shutdownNow()方法后处于STOP状态,线程池不能接受新的任务,中断所有线程,阻塞队列中没有被执行的任务全部丢弃。此时,poolsize=0,阻塞队列的size也为0。

  • 当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。接着会执行terminated()函数。

    ThreadPoolExecutor中有一个控制状态的属性叫ctl,它是一个AtomicInteger类型的变量。线程池状态就是通过AtomicInteger类型的成员变量ctl来获取的。

    获取的ctl值传入runStateOf方法,与~CAPACITY位与运算(CAPACITY是低29位全1的int变量)。

    ~CAPACITY在这里相当于掩码,用来获取ctl的高3位,表示线程池状态;而另外的低29位用于表示工作线程数

  • 线程池处在TIDYING状态时,执行完terminated()方法之后,就会由 TIDYING -> TERMINATED, 线程池被设置为TERMINATED状态。

线程池主要的任务处理流程

处理任务的核心方法是execute,我们看看 JDK 1.8 源码中ThreadPoolExecutor是如何处理线程任务的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// JDK 1.8 
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.当前线程数小于corePoolSize,则调用addWorker创建核心线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2.如果不小于corePoolSize,则将任务添加到workQueue队列。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.1 如果isRunning返回false(状态检查),则remove这个任务,然后执行拒绝策略。
if (! isRunning(recheck) && remove(command))
reject(command);
// 2.2 线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.如果放入workQueue失败,则创建非核心线程执行任务,
// 如果这时创建非核心线程失败(当前线程总数不小于maximumPoolSize时),就会执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}

ctl.get()是获取线程池状态,用int类型表示。第二步中,入队前进行了一次isRunning判断,入队之后,又进行了一次isRunning判断。

为什么要二次检查线程池的状态?

在多线程的环境下,线程池的状态是时刻发生变化的。很有可能刚获取线程池状态后线程池状态就改变了。判断是否将command加入workqueue是线程池之前的状态。倘若没有二次检查,万一线程池处于非RUNNING状态(在多线程环境下很有可能发生),那么command永远不会执行。

总结一下处理流程

  1. 线程总数量 < corePoolSize,无论线程是否空闲,都会新建一个核心线程执行任务(让核心线程数量快速达到corePoolSize,在核心线程数量 < corePoolSize时)。注意,这一步需要获得全局锁。
  2. 线程总数量 >= corePoolSize时,新来的线程任务会进入任务队列中等待,然后空闲的核心线程会依次去缓存队列中取任务来执行(体现了线程复用)。
  3. 当缓存队列满了,说明这个时候任务已经多到爆棚,需要一些“临时工”来执行这些任务了。于是会创建非核心线程去执行这个任务。注意,这一步需要获得全局锁。
  4. 缓存队列满了, 且总线程数达到了maximumPoolSize,则会采取上面提到的拒绝策略进行处理。

整个过程如图所示:

img

ThreadPoolExecutor如何做到线程复用的?

我们知道,一个线程在创建的时候会指定一个线程任务,当执行完这个线程任务之后,线程自动销毁。但是线程池却可以复用线程,即一个线程执行完线程任务后不销毁,继续执行另外的线程任务。那么,线程池如何做到线程复用呢?

原来,ThreadPoolExecutor在创建线程时,会将线程封装成工作线程worker,并放入工作线程组中,然后这个worker反复从阻塞队列中拿任务去执行。话不多说,我们继续看看源码(一定要仔细看,前后有联系)

这里的addWorker方法是在上面提到的execute方法里面调用的,先看看上半部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// ThreadPoolExecutor.addWorker方法源码上半部分
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
// 1.如果core是ture,证明需要创建的线程为核心线程,则先判断当前线程是否大于核心线程
// 如果core是false,证明需要创建的是非核心线程,则先判断当前线程数是否大于总线程数
// 如果不小于,则返回false
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

上半部分主要是判断线程数量是否超出阈值,超过了就返回false。我们继续看下半部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
    // ThreadPoolExecutor.addWorker方法源码下半部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 1.创建一个worker对象
w = new Worker(firstTask);
// 2.实例化一个Thread对象
final Thread t = w.thread;
if (t != null) {
// 3.线程池全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());

if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 4.启动这个线程
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

创建worker对象,并初始化一个Thread对象,然后启动这个线程对象。

我们接着看看Worker类,仅展示部分源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// Worker类部分源码
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{
final Thread thread;
Runnable firstTask;

Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

public void run() {
runWorker(this);
}
//其余代码略...
}

Worker类实现了Runnable接口,所以Worker也是一个线程任务。在构造方法中,创建了一个线程,线程的任务就是自己。故addWorker方法调用addWorker方法源码下半部分中的第4步t.start,会触发Worker类的run方法被JVM调用。

我们再看看runWorker的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Worker.runWorker方法源代码
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 1.线程启动之后,通过unlock方法释放锁
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 2.Worker执行firstTask或从workQueue中获取任务,如果getTask方法不返回null,循环不退出
while (task != null || (task = getTask()) != null) {
// 2.1进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 2.2检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 2.3执行beforeExecute
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 2.4执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 2.5执行afterExecute方法
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
// 2.6解锁操作
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}

首先去执行创建这个worker时就有的任务,当执行完这个任务后,worker的生命周期并没有结束,在while循环中,worker会不断地调用getTask方法从阻塞队列中获取任务然后调用task.run()执行任务,从而达到复用线程的目的。只要getTask方法不返回null,此线程就不会退出。

当然,核心线程池中创建的线程想要拿到阻塞队列中的任务,先要判断线程池的状态,如果STOP或者TERMINATED,返回null

最后看看getTask方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// Worker.getTask方法源码
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (;;) {
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}

int wc = workerCountOf(c);

// Are workers subject to culling?
// 1.allowCoreThreadTimeOut变量默认是false,核心线程即使空闲也不会被销毁
// 如果为true,核心线程在keepAliveTime内仍空闲则会被销毁。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 2.如果运行线程数超过了最大线程数,但是缓存队列已经空了,这时递减worker数量。
     // 如果有设置允许线程超时或者线程数量超过了核心线程数量,
// 并且线程在规定时间内均未poll到任务且队列为空则递减worker数量
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 3.如果timed为true(想想哪些情况下timed为true),则会调用workQueue的poll方法获取任务.
// 超时时间是keepAliveTime。如果超过keepAliveTime时长,
// poll返回了null,上边提到的while循序就会退出,线程也就执行完了。
// 如果timed为false(allowCoreThreadTimeOut为falsefalse
// 且wc > corePoolSize为false),则会调用workQueue的take方法阻塞在当前。
// 队列中有任务加入时,线程被唤醒,take方法返回任务,并执行。
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}

核心线程的会一直卡在workQueue.take方法,被阻塞并挂起,不会占用CPU资源,直到拿到Runnable 然后返回(当然如果allowCoreThreadTimeOut设置为true,那么核心线程就会去调用poll方法,因为poll可能会返回null,所以这时候核心线程满足超时条件也会被销毁)。

非核心线程会workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ,如果超时还没有拿到,下一次循环判断compareAndDecrementWorkerCount就会返回null,Worker对象的run()方法循环体的判断为null,任务结束,然后线程被系统回收 。

源码解析完毕,你理解的源码是否和图中的处理流程一致?如果不一致,那么就多看两遍吧,加油。

四种常见的线程池

Executors类中提供的几个静态方法来创建线程池。大家到了这一步,如果看懂了前面讲的ThreadPoolExecutor构造方法中各种参数的意义,那么一看到Executors类中提供的线程池的源码就应该知道这个线程池是干嘛的。

newCachedThreadPool

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

CacheThreadPool运行流程如下:

  1. 提交任务进线程池。
  2. 因为corePoolSize为0的关系,不创建核心线程,线程池最大为Integer.MAX_VALUE。
  3. 尝试将任务添加到SynchronousQueue队列。
  4. 如果SynchronousQueue入列成功,等待被当前运行的线程空闲后拉取执行。如果当前没有空闲线程,那么就创建一个非核心线程,然后从SynchronousQueue拉取任务并在当前线程执行。
  5. 如果SynchronousQueue已有任务在等待,入列操作将会阻塞。

当需要执行很多短时间的任务时,CacheThreadPool的线程复用率比较高, 会显著的提高性能。而且线程60s后会回收,意味着即使没有任务进来,CacheThreadPool并不会占用很多资源。

newFixedThreadPool

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

核心线程数量和总线程数量相等,都是传入的参数nThreads,所以只能创建核心线程,不能创建非核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故如果核心线程空闲,则交给核心线程处理;如果核心线程不空闲,则入列等待,直到核心线程空闲。

与CachedThreadPool的区别

  • 因为 corePoolSize == maximumPoolSize ,所以FixedThreadPool只会创建核心线程。 而CachedThreadPool因为corePoolSize=0,所以只会创建非核心线程。
  • 在 getTask() 方法,如果队列里没有任务可取,线程会一直阻塞在 LinkedBlockingQueue.take() ,线程不会被回收。 CachedThreadPool会在60s后收回。
  • 由于线程不会被回收,会一直卡在阻塞,所以没有任务的情况下, FixedThreadPool占用资源更多
  • 都几乎不会触发拒绝策略,但是原理不同。FixedThreadPool是因为阻塞队列可以很大(最大为Integer最大值),故几乎不会触发拒绝策略;CachedThreadPool是因为线程池很大(最大为Integer最大值),几乎不会导致线程数量大于最大线程数,故几乎不会触发拒绝策略。

newSingleThreadExecutor

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

有且仅有一个核心线程( corePoolSize == maximumPoolSize=1),使用了LinkedBlockingQueue(容量很大),所以,不会创建非核心线程。所有任务按照先来先执行的顺序执行。如果这个唯一的线程不空闲,那么新来的任务会存储在任务队列里等待执行。

newScheduledThreadPool

创建一个定长线程池,支持定时及周期性任务执行。

1
2
3
4
5
6
7
8
9
10
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}

//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
new DelayedWorkQueue());
}

四种常见的线程池基本够我们使用了,但是《阿里巴巴开发手册》不建议我们直接使用Executors类中的线程池,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学需要更加明确线程池的运行规则,规避资源耗尽的风险。

但如果你及团队本身对线程池非常熟悉,又确定业务规模不会大到资源耗尽的程度(比如线程数量或任务队列长度可能达到Integer.MAX_VALUE)时,其实是可以使用JDK提供的这几个接口的,它能让我们的代码具有更强的可读性。

阻塞队列

阻塞队列的由来

我们假设一种场景,生产者一直生产资源,消费者一直消费资源,资源存储在一个缓冲池中,生产者将生产的资源存进缓冲池中,消费者从缓冲池中拿到资源进行消费,这就是大名鼎鼎的生产者-消费者模式

该模式能够简化开发过程,一方面消除了生产者类与消费者类之间的代码依赖性,另一方面将生产数据的过程与使用数据的过程解耦简化负载。

我们自己coding实现这个模式的时候,因为需要让多个线程操作共享变量(即资源),所以很容易引发线程安全问题,造成重复消费死锁,尤其是生产者和消费者存在多个的情况。另外,当缓冲池空了,我们需要阻塞消费者,唤醒生产者;当缓冲池满了,我们需要阻塞生产者,唤醒消费者,这些个等待-唤醒逻辑都需要自己实现。(这块不明白的同学,可以看最下方结语部分的链接)

这么容易出错的事情,JDK当然帮我们做啦,这就是阻塞队列(BlockingQueue),你只管往里面存、取就行,而不用担心多线程环境下存、取共享变量的线程安全问题。

BlockingQueue是Java util.concurrent包下重要的数据结构,区别于普通的队列,BlockingQueue提供了线程安全的队列访问方式,并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

BlockingQueue一般用于生产者-消费者模式,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。BlockingQueue就是存放元素的容器

BlockingQueue的操作方法

阻塞队列提供了四组不同的方法用于插入、移除、检查元素:

方法\处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() - -
  • 抛出异常:如果试图的操作无法立即执行,抛异常。当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
  • 返回特殊值:如果试图的操作无法立即执行,返回一个特殊值,通常是true / false。
  • 一直阻塞:如果试图的操作无法立即执行,则一直阻塞或者响应中断。
  • 超时退出:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功,通常是 true / false。

注意之处

  • 不能往阻塞队列中插入null,会抛出空指针异常。
  • 可以访问阻塞队列中的任意元素,调用remove(o)可以将队列之中的特定对象移除,但并不高效,尽量避免使用。

BlockingQueue的实现类

ArrayBlockingQueue

数组结构组成的有界阻塞队列。内部结构是数组,故具有数组的特性。

1
2
3
public ArrayBlockingQueue(int capacity, boolean fair){
//..省略代码
}

可以初始化队列大小, 且一旦初始化不能改变。构造方法中的fair表示控制对象的内部锁是否采用公平锁,默认是非公平锁

LinkedBlockingQueue

链表结构组成的有界阻塞队列。内部结构是链表,具有链表的特性。默认队列的大小是Integer.MAX_VALUE,也可以指定大小。此队列按照先进先出的原则对元素进行排序。

DelayQueue

1
2
3
该队列中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素 。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。 

DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

PriorityBlockingQueue

1
基于优先级的无界阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),内部控制线程同步的锁采用的是非公平锁。

网上大部分博客上PriorityBlockingQueue为公平锁,其实是不对的,查阅源码(感谢github:ambition0802同学的指出):

1
2
3
4
5
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
this.lock = new ReentrantLock(); //默认构造方法-非公平锁
...//其余代码略
}

SynchronousQueue

这个队列比较特殊,没有任何内部容量,甚至连一个队列的容量都没有。并且每个 put 必须等待一个 take,反之亦然。

需要区别容量为1的ArrayBlockingQueue、LinkedBlockingQueue。

以下方法的返回值,可以帮助理解这个队列:

  • iterator() 永远返回空,因为里面没有东西
  • peek() 永远返回null
  • put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
  • offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
  • take() 取出并且remove掉queue里的element,取不到东西他会一直等。
  • poll() 取出并且remove掉queue里的element,只有到碰巧另外一个线程正在往queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。
  • isEmpty() 永远返回true
  • remove()&removeAll() 永远返回false

注意

PriorityBlockingQueue不会阻塞数据生产者(因为队列是无界的),而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。对于使用默认大小的LinkedBlockingQueue也是一样的。

阻塞队列的原理

阻塞队列的原理很简单,利用了Lock锁的多条件(Condition)阻塞控制。接下来我们分析ArrayBlockingQueue JDK 1.8 的源码。

首先是构造器,除了初始化队列的大小和是否是公平锁之外,还对同一个锁(lock)初始化了两个监视器,分别是notEmpty和notFull。这两个监视器的作用目前可以简单理解为标记分组,当该线程是put操作时,给他加上监视器notFull,标记这个线程是一个生产者;当线程是take操作时,给他加上监视器notEmpty,标记这个线程是消费者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//数据元素数组
final Object[] items;
//下一个待取出元素索引
int takeIndex;
//下一个待添加元素索引
int putIndex;
//元素个数
int count;
//内部锁
final ReentrantLock lock;
//消费者监视器
private final Condition notEmpty;
//生产者监视器
private final Condition notFull;

public ArrayBlockingQueue(int capacity, boolean fair) {
//..省略其他代码
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

put操作的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 1.自旋拿锁
lock.lockInterruptibly();
try {
// 2.判断队列是否满了
while (count == items.length)
// 2.1如果满了,阻塞该线程,并标记为notFull线程,
// 等待notFull的唤醒,唤醒之后继续执行while循环。
notFull.await();
// 3.如果没有满,则进入队列
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
// 4 唤醒一个等待的线程
notEmpty.signal();
}

总结put的流程:

  1. 所有执行put操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
  2. 判断阻塞队列是否满了,如果满了,则调用await方法阻塞这个线程,并标记为notFull(生产者)线程,同时释放lock锁,等待被消费者线程唤醒。
  3. 如果没有满,则调用enqueue方法将元素put进阻塞队列。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
  4. 唤醒一个标记为notEmpty(消费者)的线程。

take操作的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

take操作和put操作的流程是类似的,总结一下take操作的流程:

  1. 所有执行take操作的线程竞争lock锁,拿到了lock锁的线程进入下一步,没有拿到lock锁的线程自旋竞争锁。
  2. 判断阻塞队列是否为空,如果是空,则调用await方法阻塞这个线程,并标记为notEmpty(消费者)线程,同时释放lock锁,等待被生产者线程唤醒。
  3. 如果没有空,则调用dequeue方法。注意这一步的线程还有一种情况是第二步中阻塞的线程被唤醒且又拿到了lock锁的线程。
  4. 唤醒一个标记为notFull(生产者)的线程。

注意

  1. put和take操作都需要先获取锁,没有获取到锁的线程会被挡在第一道大门之外自旋拿锁,直到获取到锁。
  2. 就算拿到锁了之后,也不一定会顺利进行put/take操作,需要判断队列是否可用(是否满/空),如果不可用,则会被阻塞,并释放锁
  3. 在第2点被阻塞的线程会被唤醒,但是在唤醒之后,依然需要拿到锁才能继续往下执行,否则,自旋拿锁,拿到锁了再while判断队列是否可用(这也是为什么不用if判断,而使用while判断的原因)。

示例和使用场景

生产者-消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class Test {
private int queueSize = 10;
private ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(queueSize);

public static void main(String[] args) {
Test test = new Test();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();

producer.start();
consumer.start();
}

class Consumer extends Thread{

@Override
public void run() {
consume();
}

private void consume() {
while(true){
try {
queue.take();
System.out.println("从队列取走一个元素,队列剩余"+queue.size()+"个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Producer extends Thread{

@Override
public void run() {
produce();
}

private void produce() {
while(true){
try {
queue.put(1);
System.out.println("向队列取中插入一个元素,队列剩余空间:"+(queueSize-queue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

下面是这个例子的输出片段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
从队列取走一个元素,队列剩余0个元素
从队列取走一个元素,队列剩余0个元素
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:9
向队列取中插入一个元素,队列剩余空间:8
向队列取中插入一个元素,队列剩余空间:7
向队列取中插入一个元素,队列剩余空间:6
向队列取中插入一个元素,队列剩余空间:5
向队列取中插入一个元素,队列剩余空间:4
向队列取中插入一个元素,队列剩余空间:3
向队列取中插入一个元素,队列剩余空间:2
向队列取中插入一个元素,队列剩余空间:1
向队列取中插入一个元素,队列剩余空间:0
从队列取走一个元素,队列剩余1个元素
从队列取走一个元素,队列剩余9个元素

注意,这个例子中的输出结果看起来可能有问题,比如有几行在插入一个元素之后,队列的剩余空间不变。这是由于System.out.println语句没有锁。考虑到这样的情况:线程1在执行完put/take操作后立即失去CPU时间片,然后切换到线程2执行put/take操作,执行完毕后回到线程1的System.out.println语句并输出,发现这个时候阻塞队列的size已经被线程2改变了,所以这个时候输出的size并不是当时线程1执行完put/take操作之后阻塞队列的size,但可以确保的是size不会超过10个。实际上使用阻塞队列是没有问题的。

线程池中使用阻塞队列

1
2
3
4
5
6
7
8
 public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

Java中的线程池就是使用阻塞队列实现的,我们在了解阻塞队列之后,无论是使用Executors类中已经提供的线程池,还是自己通过ThreadPoolExecutor实现线程池,都会更加得心应手,想要了解线程池的同学,可以看:线程池原理。

注:上面提到了生产者-消费者模式,大家可以参考生产者-消费者模型,可以更好的理解阻塞队列。

锁接口和类

前面我们介绍了Java原生的锁——基于对象的锁,它一般是配合synchronized关键字来使用的。实际上,Java在java.util.concurrent.locks包下,还为我们提供了几个关于锁的类和接口。它们有更强大的功能或更高的性能。

synchronized的不足之处

我们先来看看synchronized有什么不足之处。

  • 如果临界区是只读操作,其实可以多线程一起执行,但使用synchronized的话,同一时间只能有一个线程执行
  • synchronized无法知道线程有没有成功获取到锁
  • 使用synchronized,如果临界区因为IO或者sleep方法等原因阻塞了,而当前线程又没有释放锁,就会导致所有线程等待

而这些都是locks包下的锁可以解决的。

锁的几种分类

锁可以根据以下几种方式来进行分类,下面我们逐一介绍。

可重入锁和非可重入锁

所谓重入锁,顾名思义。就是支持重新进入的锁,也就是说这个锁支持一个线程对资源重复加锁

synchronized关键字就是使用的重入锁。比如说,你在一个synchronized实例方法里面调用另一个本实例的synchronized实例方法,它可以重新进入这个锁,不会出现任何异常。

如果我们自己在继承AQS实现同步器的时候,没有考虑到占有锁的线程再次获取锁的场景,可能就会导致线程阻塞,那这个就是一个“非可重入锁”。

ReentrantLock的中文意思就是可重入锁。也是本文后续要介绍的重点类。

公平锁与非公平锁

这里的“公平”,其实通俗意义来说就是“先来后到”,也就是FIFO。如果对一个锁来说,先对锁获取请求的线程一定会先被满足,后对锁获取请求的线程后被满足,那这个锁就是公平的。反之,那就是不公平的。

一般情况下,非公平锁能提升一定的效率。但是非公平锁可能会发生线程饥饿(有一些线程长时间得不到锁)的情况。所以要根据实际的需求来选择非公平锁和公平锁。

ReentrantLock支持非公平锁和公平锁两种。

读写锁和排它锁

我们前面讲到的synchronized用的锁和ReentrantLock,其实都是“排它锁”。也就是说,这些锁在同一时刻只允许一个线程进行访问。

而读写锁可以在同一时刻允许多个读线程访问。Java提供了ReentrantReadWriteLock类作为读写锁的默认实现,内部维护了两个锁:一个读锁,一个写锁。通过分离读锁和写锁,使得在“读多写少”的环境下,大大地提高了性能。

注意,即使用读写锁,在写线程访问时,所有的读线程和其它写线程均被阻塞。

可见,只是synchronized是远远不能满足多样化的业务对锁的要求的。接下来我们介绍一下JDK中有关锁的一些接口和类。

JDK中有关锁的一些接口和类

众所周知,JDK中关于并发的类大多都在java.util.concurrent(以下简称juc)包下。而juc.locks包看名字就知道,是提供了一些并发锁的工具类的。前面我们介绍的AQS(AbstractQueuedSynchronizer)就是在这个包下。下面分别介绍一下这个包下的类和接口以及它们之间的关系。

抽象类AQS/AQLS/AOS

这三个抽象类有一定的关系,所以这里放到一起讲。

首先我们看AQS(AbstractQueuedSynchronizer),之前专门有章节介绍这个类,它是在JDK 1.5 发布的,提供了一个“队列同步器”的基本功能实现。而AQS里面的“资源”是用一个int类型的数据来表示的,有时候我们的业务需求资源的数量超出了int的范围,所以在JDK 1.6 中,多了一个AQLS(AbstractQueuedLongSynchronizer)。它的代码跟AQS几乎一样,只是把资源的类型变成了long类型。

AQS和AQLS都继承了一个类叫AOS(AbstractOwnableSynchronizer)。这个类也是在JDK 1.6 中出现的。这个类只有几行简单的代码。从源码类上的注释可以知道,它是用于表示锁与持有者之间的关系(独占模式)。可以看一下它的主要方法:

1
2
3
4
5
6
7
8
9
10
11
12
// 独占模式,锁的持有者  
private transient Thread exclusiveOwnerThread;

// 设置锁持有者
protected final void setExclusiveOwnerThread(Thread t) {
exclusiveOwnerThread = t;
}

// 获取锁的持有线程
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}

接口Condition/Lock/ReadWriteLock

juc.locks包下共有三个接口:ConditionLockReadWriteLock。其中,Lock和ReadWriteLock从名字就可以看得出来,分别是锁和读写锁的意思。Lock接口里面有一些获取锁和释放锁的方法声明,而ReadWriteLock里面只有两个方法,分别返回“读锁”和“写锁”:

1
2
3
4
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}

Lock接口中有一个方法是可以获得一个Condition:

1
Condition newCondition();

之前我们提到了每个对象都可以用继承自Objectwait/notify方法来实现等待/通知机制。而Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。

那为什么既然有Object的监视器方法了,还要用Condition呢?这里有一个二者简单的对比:

对比项 Object监视器 Condition
前置条件 获取对象的锁 调用Lock.lock获取锁,调用Lock.newCondition获取Condition对象
调用方式 直接调用,比如object.notify() 直接调用,比如condition.await()
等待队列的个数 一个 多个
当前线程释放锁进入等待状态 支持 支持
当前线程释放锁进入等待状态,在等待状态中不中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态直到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

Condition和Object的wait/notify基本相似。其中,Condition的await方法对应的是Object的wait方法,而Condition的signal/signalAll方法则对应Object的notify/notifyAll()。但Condition类似于Object的等待/通知机制的加强版。我们来看看主要的方法:

方法名称 描述
await() 当前线程进入等待状态直到被通知(signal)或者中断;当前线程进入运行状态并从await()方法返回的场景包括:(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;(2)其他线程调用interrupt方法中断当前线程;
awaitUninterruptibly() 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程
awaitNanos(long) 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了
awaitUntil(Date) 当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回true,否则返回false
signal() 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁
signalAll() 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁

ReentrantLock

ReentrantLock是一个非抽象类,它是Lock接口的JDK默认实现,实现了锁的基本功能。从名字上看,它是一个”可重入“锁,从源码上看,它内部有一个抽象类Sync,是继承了AQS,自己实现的一个同步器。同时,ReentrantLock内部有两个非抽象类NonfairSyncFairSync,它们都继承了Sync。从名字上看得出,分别是”非公平同步器“和”公平同步器“的意思。这意味着ReentrantLock可以支持”公平锁“和”非公平锁“。

通过看这两个同步器的源码可以发现,它们的实现都是”独占“的。都调用了AOS的setExclusiveOwnerThread方法,所以ReentrantLock的锁是”独占“的,也就是说,它的锁都是”排他锁“,不能共享。

在ReentrantLock的构造方法里,可以传入一个boolean类型的参数,来指定它是否是一个公平锁,默认情况下是非公平的。这个参数一旦实例化后就不能修改,只能通过isFair()方法来查看。

ReentrantReadWriteLock

这个类也是一个非抽象类,它是ReadWriteLock接口的JDK默认实现。它与ReentrantLock的功能类似,同样是可重入的,支持非公平锁和公平锁。不同的是,它还支持”读写锁“。

ReentrantReadWriteLock内部的结构大概是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 内部结构
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
// 具体实现
}
static final class NonfairSync extends Sync {
// 具体实现
}
static final class FairSync extends Sync {
// 具体实现
}
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 具体实现
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 具体实现
}

// 构造方法,初始化两个锁
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

// 获取读锁和写锁的方法
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }

可以看到,它同样是内部维护了两个同步器。且维护了两个Lock的实现类ReadLock和WriteLock。从源码可以发现,这两个内部类用的是外部类的同步器。

ReentrantReadWriteLock实现了读写锁,但它有一个小弊端,就是在“写”操作的时候,其它线程不能写也不能读。我们称这种现象为“写饥饿”,将在后文的StampedLock类继续讨论这个问题。

StampedLock

StampedLock类是在Java 8 才发布的,也是Doug Lea大神所写,有人号称它为锁的性能之王。它没有实现Lock接口和ReadWriteLock接口,但它其实是实现了“读写锁”的功能,并且性能比ReentrantReadWriteLock更高。StampedLock还把读锁分为了“乐观读锁”和“悲观读锁”两种。

前面提到了ReentrantReadWriteLock会发生“写饥饿”的现象,但StampedLock不会。它是怎么做到的呢?它的核心思想在于,在读的时候如果发生了写,应该通过重试的方式来获取新的值,而不应该阻塞写操作。这种模式也就是典型的无锁编程思想,和CAS自旋的思想一样。这种操作方式决定了StampedLock在读线程非常多而写线程非常少的场景下非常适用,同时还避免了写饥饿情况的发生。

这里篇幅有限,就不介绍StampedLock的源码了,只是分析一下官方提供的用法(在JDK源码类声明的上方或Javadoc里可以找到)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class Point {
private double x, y;
private final StampedLock sl = new StampedLock();

// 写锁的使用
void move(double deltaX, double deltaY) {
long stamp = sl.writeLock(); // 获取写锁
try {
x += deltaX;
y += deltaY;
} finally {
sl.unlockWrite(stamp); // 释放写锁
}
}

// 乐观读锁的使用
double distanceFromOrigin() {
long stamp = sl.tryOptimisticRead(); // 获取乐观读锁
double currentX = x, currentY = y;
if (!sl.validate(stamp)) { // //检查乐观读锁后是否有其他写锁发生,有则返回false
stamp = sl.readLock(); // 获取一个悲观读锁
try {
currentX = x;
currentY = y;
} finally {
sl.unlockRead(stamp); // 释放悲观读锁
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}

// 悲观读锁以及读锁升级写锁的使用
void moveIfAtOrigin(double newX, double newY) {
long stamp = sl.readLock(); // 悲观读锁
try {
while (x == 0.0 && y == 0.0) {
// 读锁尝试转换为写锁:转换成功后相当于获取了写锁,转换失败相当于有写锁被占用
long ws = sl.tryConvertToWriteLock(stamp);

if (ws != 0L) { // 如果转换成功
stamp = ws; // 读锁的票据更新为写锁的
x = newX;
y = newY;
break;
}
else { // 如果转换失败
sl.unlockRead(stamp); // 释放读锁
stamp = sl.writeLock(); // 强制获取写锁
}
}
} finally {
sl.unlock(stamp); // 释放所有锁
}
}
}

乐观读锁的意思就是先假定在这个锁获取期间,共享变量不会被改变,既然假定不会被改变,那就不需要上锁。在获取乐观读锁之后进行了一些操作,然后又调用了validate方法,这个方法就是用来验证tryOptimisticRead之后,是否有写操作执行过,如果有,则获取一个悲观读锁,这里的悲观读锁和ReentrantReadWriteLock中的读锁类似,也是个共享锁。

可以看到,StampedLock获取锁会返回一个long类型的变量,释放锁的时候再把这个变量传进去。简单看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 用于操作state后获取stamp的值
private static final int LG_READERS = 7;
private static final long RUNIT = 1L; //0000 0000 0001
private static final long WBIT = 1L << LG_READERS; //0000 1000 0000
private static final long RBITS = WBIT - 1L; //0000 0111 1111
private static final long RFULL = RBITS - 1L; //0000 0111 1110
private static final long ABITS = RBITS | WBIT; //0000 1111 1111
private static final long SBITS = ~RBITS; //1111 1000 0000

// 初始化时state的值
private static final long ORIGIN = WBIT << 1; //0001 0000 0000

// 锁共享变量state
private transient volatile long state;
// 读锁溢出时用来存储多出的读锁
private transient int readerOverflow;

StampedLock用这个long类型的变量的前7位(LG_READERS)来表示读锁,每获取一个悲观读锁,就加1(RUNIT),每释放一个悲观读锁,就减1。而悲观读锁最多只能装128个(7位限制),很容易溢出,所以用一个int类型的变量来存储溢出的悲观读锁。

写锁用state变量剩下的位来表示,每次获取一个写锁,就加0000 1000 0000(WBIT)。需要注意的是,写锁在释放的时候,并不是减WBIT,而是再加WBIT。这是为了让每次写锁都留下痕迹,解决CAS中的ABA问题,也为乐观锁检查变化validate方法提供基础。

乐观读锁就比较简单了,并没有真正改变state的值,而是在获取锁的时候记录state的写状态,在操作完成后去检查state的写状态部分是否发生变化,上文提到了,每次写锁都会留下痕迹,也是为了这里乐观锁检查变化提供方便。

总的来说,StampedLock的性能是非常优异的,基本上可以取代ReentrantReadWriteLock的作用。

并发容器集合

同步容器与并发容器

我们知道在java.util包下提供了一些容器类,而Vector和Hashtable是线程安全的容器类,但是这些容器实现同步的方式是通过对方法加锁(sychronized)方式实现的,这样读写均需要锁操作,导致性能低下。

而即使是Vector这样线程安全的类,在面对多线程下的复合操作的时候也是需要通过客户端加锁的方式保证原子性。如下面例子说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class TestVector {
private Vector<String> vector;

//方法一
public Object getLast(Vector vector) {
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}

//方法二
public void deleteLast(Vector vector) {
int lastIndex = vector.size() - 1;
vector.remove(lastIndex);
}

//方法三
public Object getLastSysnchronized(Vector vector) {
synchronized(vector){
int lastIndex = vector.size() - 1;
return vector.get(lastIndex);
}
}

//方法四
public void deleteLastSysnchronized(Vector vector) {
synchronized (vector){
int lastIndex = vector.size() - 1;
vector.remove(lastIndex);
}
}

}

如果方法一和方法二为一个组合的话。那么当方法一获取到了vector的size之后,方法二已经执行完毕,这样就导致程序的错误。

如果方法三与方法四组合的话。通过锁机制保证了在vector上的操作的原子性。

并发容器是Java 5 提供的在多线程编程下用于代替同步容器,针对不同的应用场景进行设计,提高容器的并发访问性,同时定义了线程安全的复合操作。

并发容器类介绍

整体架构(列举常用的容器类)

并发容器

其中,阻塞队列(BlockingQueue)在有介绍,CopyOnWrite容器(CopyOnWritexxx)在有介绍,这里不做过多介绍。

下面分别介绍一些常用的并发容器类和接口,因篇幅原因,这里只介绍这些类的用途和基本的原理,不做过多的源码解析。

并发Map

ConcurrentMap接口

ConcurrentMap接口继承了Map接口,在Map接口的基础上又定义了四个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ConcurrentMap<K, V> extends Map<K, V> {

//插入元素
V putIfAbsent(K key, V value);

//移除元素
boolean remove(Object key, Object value);

//替换元素
boolean replace(K key, V oldValue, V newValue);

//替换元素
V replace(K key, V value);

}

putIfAbsent:与原有put方法不同的是,putIfAbsent方法中如果插入的key相同,则不替换原有的value值;

remove:与原有remove方法不同的是,新remove方法中增加了对value的判断,如果要删除的key-value不能与Map中原有的key-value对应上,则不会删除该元素;

replace(K,V,V):增加了对value值的判断,如果key-oldValue能与Map中原有的key-value对应上,才进行替换操作;

replace(K,V):与上面的replace不同的是,此replace不会对Map中原有的key-value进行比较,如果key存在则直接替换;

ConcurrentHashMap类

ConcurrentHashMap同HashMap一样也是基于散列表的map,但是它提供了一种与Hashtable完全不同的加锁策略,提供更高效的并发性和伸缩性。

ConcurrentHashMap在JDK 1.7 和JDK 1.8中有一些区别。这里我们分开介绍一下。

JDK 1.7

ConcurrentHashMap在JDK 1.7中,提供了一种粒度更细的加锁机制来实现在多线程下更高的性能,这种机制叫分段锁(Lock Striping)。

提供的优点是:在并发环境下将实现更高的吞吐量,而在单线程环境下只损失非常小的性能。

可以这样理解分段锁,就是将数据分段,对每一段数据分配一把锁。当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。

有些方法需要跨段,比如size()、isEmpty()、containsValue(),它们可能需要锁定整个表而不仅仅是某个段,这需要按顺序锁定所有段,操作完毕后,又按顺序释放所有段的锁。如下图:

分段锁机制

ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,HashEntry则用于存储键值对数据。

一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素, 每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

JDK 1.8

而在JDK 1.8中,ConcurrentHashMap主要做了两个优化:

  • 同HashMap一样,链表也会在长度达到8的时候转化为红黑树,这样可以提升大量冲突时候的查询效率;
  • 以某个位置的头结点(链表的头结点或红黑树的root结点)为锁,配合自旋+CAS避免不必要的锁开销,进一步提升并发性能。

ConcurrentNavigableMap接口与ConcurrentSkipListMap类

ConcurrentNavigableMap接口继承了NavigableMap接口,这个接口提供了针对给定搜索目标返回最接近匹配项的导航方法。

ConcurrentNavigableMap接口的主要实现类是ConcurrentSkipListMap类。从名字上来看,它的底层使用的是跳表(SkipList)的数据结构。关于跳表的数据结构这里不做太多介绍,它是一种”空间换时间“的数据结构,可以使用CAS来保证并发安全性。

并发Queue

JDK并没有提供线程安全的List类,因为对List来说,很难去开发一个通用并且没有并发瓶颈的线程安全的List。因为即使简单的读操作,拿contains() 这样一个操作来说,很难想到搜索的时候如何避免锁住整个list。

所以退一步,JDK提供了对队列和双端队列的线程安全的类:ConcurrentLinkedQueue和ConcurrentLinkedDeque。因为队列相对于List来说,有更多的限制。这两个类是使用CAS来实现线程安全的。

并发Set

JDK提供了ConcurrentSkipListSet,是线程安全的有序的集合。底层是使用ConcurrentSkipListMap实现。

谷歌的guava框架实现了一个线程安全的ConcurrentHashSet:

1
Set<String> s = Sets.newConcurrentHashSet();

CopyOnWrite容器

什么是CopyOnWrite容器

在说到CopyOnWrite容器之前我们先来谈谈什么是CopyOnWrite机制,CopyOnWrite是计算机设计领域中的一种优化策略,也是一种在并发场景下常用的设计思想——写入时复制思想。

那什么是写入时复制思想呢?就是当有多个调用者同时去请求一个资源数据的时候,有一个调用者出于某些原因需要对当前的数据源进行修改,这个时候系统将会复制一个当前数据源的副本给调用者修改。

CopyOnWrite容器即写时复制的容器,当我们往一个容器中添加元素的时候,不直接往容器中添加,而是将当前容器进行copy,复制出来一个新的容器,然后向新容器中添加我们需要的元素,最后将原容器的引用指向新容器。

这样做的好处在于,我们可以在并发的场景下对容器进行”读操作”而不需要”加锁”,从而达到读写分离的目的。从JDK 1.5 开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器 ,分别是CopyOnWriteArrayList和CopyOnWriteArraySet 。我们着重给大家介绍一下CopyOnWriteArrayList。

CopyOnWriteArrayList

优点: CopyOnWriteArrayList经常被用于“读多写少”的并发场景,是因为CopyOnWriteArrayList无需任何同步措施,大大增强了读的性能。在Java中遍历线程非安全的List(如:ArrayList和 LinkedList)的时候,若中途有别的线程对List容器进行修改,那么会抛出ConcurrentModificationException异常。CopyOnWriteArrayList由于其”读写分离”,遍历和修改操作分别作用在不同的List容器,所以在使用迭代器遍历的时候,则不会抛出异常。

缺点: 第一个缺点是CopyOnWriteArrayList每次执行写操作都会将原容器进行拷贝一份,数据量大的时候,内存会存在较大的压力,可能会引起频繁Full GC(ZGC因为没有使用Full GC)。比如这些对象占用的内存200M左右,那么再写入100M数据进去,内存就会多占用300M。

第二个缺点是CopyOnWriteArrayList由于实现的原因,写和读分别作用在不同新老容器上,在写操作执行过程中,读不会阻塞,但读取到的却是老容器的数据。

现在我们来看一下CopyOnWriteArrayList的add操作源码,它的逻辑很清晰,就是先把原容器进行copy,然后在新的副本上进行“写操作”,最后再切换引用,在此过程中是加了锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public boolean add(E e) {

// ReentrantLock加锁,保证线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
// 拷贝原容器,长度为原容器长度加一
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 在新副本上执行添加操作
newElements[len] = e;
// 将原容器引用指向新副本
setArray(newElements);
return true;
} finally {
// 解锁
lock.unlock();
}
}

我们再来看一下remove操作的源码,remove的逻辑是将要remove元素之外的其他元素拷贝到新的副本中,然后再将原容器的引用指向新的副本中,因为remove操作也是“写操作”所以也是要加锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public E remove(int index) {

// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
E oldValue = get(elements, index);
int numMoved = len - index - 1;
if (numMoved == 0)
// 如果要删除的是列表末端数据,拷贝前len-1个数据到新副本上,再切换引用
setArray(Arrays.copyOf(elements, len - 1));
else {
// 否则,将要删除元素之外的其他元素拷贝到新副本中,并切换引用
Object[] newElements = new Object[len - 1];
System.arraycopy(elements, 0, newElements, 0, index);
System.arraycopy(elements, index + 1, newElements, index,
numMoved);
setArray(newElements);
}
return oldValue;
} finally {
// 解锁
lock.unlock();
}
}

我们再来看看CopyOnWriteArrayList效率最高的读操作的源码

1
2
3
4
5
6
public E get(int index) {
return get(getArray(), index);
}
private E get(Object[] a, int index) {
return (E) a[index];
}

由上可见“读操作”是没有加锁,直接读取。

CopyOnWrite的业务中实现

接下来,我们结合具体业务场景来实现一个CopyOnWriteMap的并发容器并且使用它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.Collection;
import java.util.Map;
import java.util.Set;

public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
private volatile Map<K, V> internalMap;

public CopyOnWriteMap() {
internalMap = new HashMap<K, V>();
}

public V put(K key, V value) {
synchronized (this) {
Map<K, V> newMap = new HashMap<K, V>(internalMap);
V val = newMap.put(key, value);
internalMap = newMap;
return val;
}
}

public V get(Object key) {
return internalMap.get(key);
}

public void putAll(Map<? extends K, ? extends V> newData) {
synchronized (this) {
Map<K, V> newMap = new HashMap<K, V>(internalMap);
newMap.putAll(newData);
internalMap = newMap;
}
}
}

上面就是参考CopyOnWriteArrayList实现的CopyOnWriteMap,我们可以用这个容器来做什么呢?结合我们之前说的CopyOnWrite的复制思想,它最适用于“读多写少”的并发场景。

场景:假如我们有一个搜索的网站需要屏蔽一些“关键字”,“黑名单”每晚定时更新,每当用户搜索的时候,“黑名单”中的关键字不会出现在搜索结果当中,并且提示用户敏感字。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 黑名单服务
public class BlackListServiceImpl {
// 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。
private static CopyOnWriteMap<String, Boolean> blackListMap =
new CopyOnWriteMap<String, Boolean>(1000);

public static boolean isBlackList(String id) {
return blackListMap.get(id) == null ? false : true;
}

public static void addBlackList(String id) {
blackListMap.put(id, Boolean.TRUE);
}

/**
* 批量添加黑名单
* (使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。
* 如使用上面代码里的addBlackList方法)
* @param ids
*/
public static void addBlackList(Map<String,Boolean> ids) {
blackListMap.putAll(ids);
}

}

这里需要各位小伙伴特别特别注意一个问题,此处的场景是每晚凌晨“黑名单”定时更新,原因是CopyOnWrite容器有数据一致性的问题,它只能保证最终数据一致性

所以如果我们希望写入的数据马上能准确地读取,请不要使用CopyOnWrite容器。

通信工具类

JDK中提供了一些工具类以供开发者使用。这样的话我们在遇到一些常见的应用场景时就可以使用这些工具类,而不用自己再重复造轮子了。

它们都在java.util.concurrent包下。先总体概括一下都有哪些工具类,它们有什么作用,然后再分别介绍它们的主要使用方法和原理。

作用
Semaphore 限制线程的数量
Exchanger 两个线程交换数据
CountDownLatch 线程等待直到计数器减为0时开始工作
CyclicBarrier 作用跟CountDownLatch类似,但是可以重复使用
Phaser 增强的CyclicBarrier

下面分别介绍这几个类。

Semaphore

Semaphore介绍

Semaphore翻译过来是信号的意思。顾名思义,这个工具类提供的功能就是多个线程彼此“打信号”。而这个“信号”是一个int类型的数据,也可以看成是一种“资源”。

可以在构造函数中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的。

1
2
3
4
5
6
7
8
// 默认情况下使用非公平
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

最主要的方法是acquire方法和release方法。acquire()方法会申请一个permit,而release方法会释放一个permit。当然,你也可以申请多个acquire(int permits)或者释放多个release(int permits)。

每次acquire,permits就会减少一个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。

Semaphore案例

Semaphore往往用于资源有限的场景中,去限制线程的数量。举个例子,我想限制同时只能有3个线程在工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SemaphoreDemo {
static class MyThread implements Runnable {

private int value;
private Semaphore semaphore;

public MyThread(int value, Semaphore semaphore) {
this.value = value;
this.semaphore = semaphore;
}

@Override
public void run() {
try {
semaphore.acquire(); // 获取permit
System.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",
value, semaphore.availablePermits(), semaphore.getQueueLength()));
// 睡眠随机时间,打乱释放顺序
Random random =new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("线程%d释放了资源", value));
} catch (InterruptedException e) {
e.printStackTrace();
} finally{
semaphore.release(); // 释放permit
}
}
}

public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
for (int i = 0; i < 10; i++) {
new Thread(new MyThread(i, semaphore)).start();
}
}
}

输出:

当前线程是1, 还剩2个资源,还有0个线程在等待
当前线程是0, 还剩1个资源,还有0个线程在等待
当前线程是6, 还剩0个资源,还有0个线程在等待
线程6释放了资源
当前线程是2, 还剩0个资源,还有6个线程在等待
线程2释放了资源
当前线程是4, 还剩0个资源,还有5个线程在等待
线程0释放了资源
当前线程是7, 还剩0个资源,还有4个线程在等待
线程1释放了资源
当前线程是8, 还剩0个资源,还有3个线程在等待
线程7释放了资源
当前线程是5, 还剩0个资源,还有2个线程在等待
线程4释放了资源
当前线程是3, 还剩0个资源,还有1个线程在等待
线程8释放了资源
当前线程是9, 还剩0个资源,还有0个线程在等待
线程9释放了资源
线程5释放了资源
线程3释放了资源

可以看到,在这次运行中,最开始是1, 0, 6这三个线程获得了资源,而其它线程进入了等待队列。然后当某个线程释放资源后,就会有等待队列中的线程获得资源。

当然,Semaphore默认的acquire方法是会让线程进入等待队列,且会抛出中断异常。但它还有一些方法可以忽略中断或不进入阻塞队列:

1
2
3
4
5
6
7
8
9
10
// 忽略中断
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)

// 不进入等待队列,底层使用CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore原理

Semaphore内部有一个继承了AQS的同步器Sync,重写了tryAcquireShared方法。在这个方法里,会去尝试获取资源。

如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入AQS的等待队列。

Exchanger

Exchanger类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间传送任何数据。先来一个案例看看如何使用,比如两个线程之间想要传送字符串:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ExchangerDemo {
public static void main(String[] args) throws InterruptedException {
Exchanger<String> exchanger = new Exchanger<>();

new Thread(() -> {
try {
System.out.println("这是线程A,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程A的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");
Thread.sleep(1000);

new Thread(() -> {
try {
System.out.println("这是线程B,得到了另一个线程的数据:"
+ exchanger.exchange("这是来自线程B的数据"));
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}

输出:

这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据

可以看到,当一个线程调用exchange方法后,它是处于阻塞状态的,只有当另一个线程也调用了exchange方法,它才会继续向下执行。看源码可以发现它是使用park/unpark来实现等待状态的切换的,但是在使用park/unpark方法之前,使用了CAS检查,估计是为了提高性能。

Exchanger一般用于两个线程之间更方便地在内存中交换数据,因为其支持泛型,所以我们可以传输任何的数据,比如IO流或者IO缓存。根据JDK里面的注释的说法,可以总结为一下特性:

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。

Exchanger类还有一个有超时参数的方法,如果在指定时间内没有另一个线程调用exchange,就会抛出一个超时异常。

1
public V exchange(V x, long timeout, TimeUnit unit)

那么问题来了,Exchanger只能是两个线程交换数据吗?那三个调用同一个实例的exchange方法会发生什么呢?答案是只有前两个线程会交换数据,第三个线程会进入阻塞状态。

需要注意的是,exchange是可以重复使用的。也就是说。两个线程可以使用Exchanger在内存中不断地再交换数据。

CountDownLatch

CountDownLatch介绍

先来解读一下CountDownLatch这个类名字的意义。CountDown代表计数递减,Latch是“门闩”的意思。也有人把它称为“屏障”。而CountDownLatch这个类的作用也很贴合这个名字的意义,假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。

CountDownLatch的方法也很简单,如下:

1
2
3
4
5
6
7
// 构造方法:
public CountDownLatch(int count)

public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count

CountDownLatch案例

我们知道,玩游戏的时候,在游戏真正开始之前,一般会等待一些前置任务完成,比如“加载地图数据”,“加载人物模型”,“加载背景音乐”等等。只有当所有的东西都加载完成后,玩家才能真正进入游戏。下面我们就来模拟一下这个demo。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class CountDownLatchDemo {
// 定义前置任务线程
static class PreTaskThread implements Runnable {

private String task;
private CountDownLatch countDownLatch;

public PreTaskThread(String task, CountDownLatch countDownLatch) {
this.task = task;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(task + " - 任务完成");
countDownLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
// 假设有三个模块需要加载
CountDownLatch countDownLatch = new CountDownLatch(3);

// 主任务
new Thread(() -> {
try {
System.out.println("等待数据加载...");
System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));
countDownLatch.await();
System.out.println("数据加载完成,正式开始游戏!");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

// 前置任务
new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();
new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();
new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();
}
}

输出:

等待数据加载…
还有3个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!

CountDownLatch原理

其实CountDownLatch类的原理挺简单的,内部同样是一个继承了AQS的实现类Sync,且实现起来还很简单,可能是JDK里面AQS的子类中最简单的实现了,有兴趣的读者可以去看看这个内部类的源码。

需要注意的是构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值

CyclicBarrier

CyclicBarrier介绍

CyclicBarrirer从名字上来理解是“循环的屏障”的意思。前面提到了CountDownLatch一旦计数值count被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障。

CyclicBarrier Barrier被破坏

如果参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException。可以用isBroken()方法检测Barrier是否被破坏。

  1. 如果有线程已经处于等待状态,调用reset方法会导致已经在等待的线程出现BrokenBarrierException异常。并且由于出现了BrokenBarrierException,将会导致始终无法等待。
  2. 如果在等待的过程中,线程被中断,会抛出InterruptedException异常,并且这个异常会传播到其他所有的线程。
  3. 如果在执行屏障操作过程中发生异常,则该异常将传播到当前线程中,其他线程会抛出BrokenBarrierException,屏障被损坏。
  4. 如果超出指定的等待时间,当前线程会抛出 TimeoutException 异常,其他线程会抛出BrokenBarrierException异常。

CyclicBarrier案例

我们同样用玩游戏的例子。如果玩一个游戏有多个“关卡”,那使用CountDownLatch显然不太合适,那需要为每个关卡都创建一个实例。那我们可以使用CyclicBarrier来实现每个关卡的数据加载等待功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CyclicBarrierDemo {
static class PreTaskThread implements Runnable {

private String task;
private CyclicBarrier cyclicBarrier;

public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
this.task = task;
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
// 假设总共三个关卡
for (int i = 1; i < 4; i++) {
try {
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d的任务%s完成", i, task));
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
cyclicBarrier.reset(); // 重置屏障
}
}
}

public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
System.out.println("本关卡所有前置任务完成,开始游戏...");
});

new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();
new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();
}
}

输出:

关卡1的任务加载地图数据完成
关卡1的任务加载背景音乐完成
关卡1的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡2的任务加载地图数据完成
关卡2的任务加载背景音乐完成
关卡2的任务加载人物模型完成
本关卡所有前置任务完成,开始游戏…
关卡3的任务加载人物模型完成
关卡3的任务加载地图数据完成
关卡3的任务加载背景音乐完成
本关卡所有前置任务完成,开始游戏…

注意这里跟CountDownLatch的代码有一些不同。CyclicBarrier没有分为await()countDown(),而是只有单独的一个await()方法。

一旦调用await()方法的线程数量等于构造方法中传入的任务总量(这里是3),就代表达到屏障了。CyclicBarrier允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个Runnable类型的对象。上述案例就是在达到屏障时,输出“本关卡所有前置任务完成,开始游戏…”。

1
2
3
4
5
6
7
// 构造方法
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
// 具体实现
}

CyclicBarrier原理

CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使用的是Lock + Condition实现的等待/通知模式。详情可以查看这个方法的源码:

1
private int dowait(boolean timed, long nanos)

Phaser

Phaser介绍

Phaser这个单词是“移相器,相位器”的意思(好吧,笔者并不懂这是什么玩意,下方资料来自百度百科)。这个类是从JDK 1.7 中出现的。

移相器(Phaser)能够对波的相位进行调整的一种装置。任何传输介质对在其中传导的波动都会引入相移,这是早期模拟移相器的原理;现代电子技术发展后利用A/D、D/A转换实现了数字移相,顾名思义,它是一种不连续的移相技术,但特点是移相精度高。 移相器在雷达、导弹姿态控制、加速器、通信、仪器仪表甚至于音乐等领域都有着广泛的应用

Phaser类有点复杂,这里只介绍一些基本的用法和知识点。详情可以查看JDK文档,文档里有这个类非常详尽的介绍。

前面我们介绍了CyclicBarrier,可以发现它在构造方法里传入“任务总量”parties之后,就不能修改这个值了,并且每次调用await()方法也只能消耗一个parties计数。但Phaser可以动态地调整任务总量!

名词解释:

  • party:对应一个线程,数量可以通过register或者构造参数传入;
  • arrive:对应一个party的状态,初始时是unarrived,当调用arriveAndAwaitAdvance()或者 arriveAndDeregister()进入arrive状态,可以通过getUnarrivedParties()获取当前未到达的数量;
  • register:注册一个party,每一阶段必须所有注册的party都到达才能进入下一阶段;
  • deRegister:减少一个party。
  • phase:阶段,当所有注册的party都arrive之后,将会调用Phaser的onAdvance()方法来判断是否要进入下一阶段。

Phaser终止的两种途径,Phaser维护的线程执行完毕或者onAdvance()返回true 此外Phaser还能维护一个树状的层级关系,构造的时候new Phaser(parentPhaser),对于Task执行时间短的场景(竞争激烈),也就是说有大量的party, 那可以把每个Phaser的任务量设置较小,多个Phaser共同继承一个父Phaser。

Phasers with large numbers of parties that would otherwise experience heavy synchronization contention costs may instead be set up so that groups of sub-phasers share a common parent. This may greatly increase throughput even though it incurs greater per-operation overhead.

翻译:如果有大量的party,那许多线程可能同步的竞争成本比较高。所以可以拆分成多个子Phaser共享一个共同的父Phaser。这可能会大大增加吞吐量,即使它会带来更多的每次操作开销。

Phaser案例

还是游戏的案例。假设我们游戏有三个关卡,但只有第一个关卡有新手教程,需要加载新手教程模块。但后面的第二个关卡和第三个关卡都不需要。我们可以用Phaser来做这个需求。

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class PhaserDemo {
static class PreTaskThread implements Runnable {

private String task;
private Phaser phaser;

public PreTaskThread(String task, Phaser phaser) {
this.task = task;
this.phaser = phaser;
}

@Override
public void run() {
for (int i = 1; i < 4; i++) {
try {
// 第二次关卡起不加载NPC,跳过
if (i >= 2 && "加载新手教程".equals(task)) {
continue;
}
Random random = new Random();
Thread.sleep(random.nextInt(1000));
System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】",
i, phaser.getRegisteredParties(), task));

// 从第二个关卡起,不加载NPC
if (i == 1 && "加载新手教程".equals(task)) {
System.out.println("下次关卡移除加载【新手教程】模块");
phaser.arriveAndDeregister(); // 移除一个模块
} else {
phaser.arriveAndAwaitAdvance();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public static void main(String[] args) {
Phaser phaser = new Phaser(4) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println(String.format("第%d次关卡准备完成", phase + 1));
return phase == 3 || registeredParties == 0;
}
};

new Thread(new PreTaskThread("加载地图数据", phaser)).start();
new Thread(new PreTaskThread("加载人物模型", phaser)).start();
new Thread(new PreTaskThread("加载背景音乐", phaser)).start();
new Thread(new PreTaskThread("加载新手教程", phaser)).start();
}
}

输出:

关卡1,需要加载4个模块,当前模块【加载背景音乐】
关卡1,需要加载4个模块,当前模块【加载新手教程】
下次关卡移除加载【新手教程】模块
关卡1,需要加载3个模块,当前模块【加载地图数据】
关卡1,需要加载3个模块,当前模块【加载人物模型】
第1次关卡准备完成
关卡2,需要加载3个模块,当前模块【加载地图数据】
关卡2,需要加载3个模块,当前模块【加载背景音乐】
关卡2,需要加载3个模块,当前模块【加载人物模型】
第2次关卡准备完成
关卡3,需要加载3个模块,当前模块【加载人物模型】
关卡3,需要加载3个模块,当前模块【加载地图数据】
关卡3,需要加载3个模块,当前模块【加载背景音乐】
第3次关卡准备完成

这里要注意关卡1的输出,在“加载新手教程”线程中调用了arriveAndDeregister()减少一个party之后,后面的线程使用getRegisteredParties()得到的是已经被修改后的parties了。但是当前这个阶段(phase),仍然是需要4个parties都arrive才触发屏障的。从下一个阶段开始,才需要3个parties都arrive就触发屏障。

另外Phaser类用来控制某个阶段的线程数量很有用,但它并在意这个阶段具体有哪些线程arrive,只要达到它当前阶段的parties值,就触发屏障。所以我这里的案例虽然制定了特定的线程(加载新手教程)来更直观地表述Phaser的功能,但是其实Phaser是没有分辨具体是哪个线程的功能的,它在意的只是数量,这一点需要读者注意。

Phaser原理

Phaser类的原理相比起来要复杂得多。它内部使用了两个基于Fork-Join框架的原子类辅助:

1
2
3
4
5
6
private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;

static final class QNode implements ForkJoinPool.ManagedBlocker {
// 实现代码
}

有兴趣的读者可以去看看JDK源代码,这里不做过多叙述。

总的来说,CountDownLatch,CyclicBarrier,Phaser是一个比一个强大,但也一个比一个复杂。根据自己的业务需求合理选择即可。

Fork/Join框架

什么是Fork/Join

Fork/Join框架是一个实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。

与其他ExecutorService相关的实现相同的是,Fork/Join框架会将任务分配给线程池中的线程。而与之不同的是,Fork/Join框架在执行任务时使用了工作窃取算法

fork在英文里有分叉的意思,join在英文里连接、结合的意思。顾名思义,fork就是要使一个大任务分解成若干个小任务,而join就是最后将各个小任务的结果结合起来得到大任务的结果。

Fork/Join的运行流程大致如下所示:

fork_join流程图

需要注意的是,图里的次级子任务可以一直分下去,一直分到子任务足够小为止。用伪代码来表示如下:

1
2
3
4
5
6
7
8
solve(任务):
if(任务已经划分到足够小):
顺序执行任务
else:
for(划分任务得到子任务)
solve(子任务)
结合所有子任务的结果到上一层循环
return 最终结合的结果

通过上面伪代码可以看出,我们通过递归嵌套的计算得到最终结果,这里有体现分而治之(divide and conquer) 的算法思想。

工作窃取算法

工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。

工作窃取流程如下图所示:

工作窃取算法运行流程图

值得注意的是,当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。

另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。

Fork/Join的具体实现

前面我们说Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在Fork/Join框架里提供了抽象类ForkJoinTask来实现任务。

ForkJoinTask

ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。

fork()方法:使用线程池中的空闲线程异步提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
// 本文所有代码都引自Java 8
public final ForkJoinTask<V> fork() {
Thread t;
// ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理
// 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
// 如果不是则将线程加入队列
// 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中
ForkJoinPool.common.externalPush(this);
return this;
}

其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里

join()方法:等待处理任务的线程处理完毕,获得返回值。

来看下join()的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final V join() {
int s;
// doJoin()方法来获取当前任务的执行状态
if ((s = doJoin() & DONE_MASK) != NORMAL)
// 任务异常,抛出异常
reportException(s);
// 任务正常完成,获取返回值
return getRawResult();
}

/**
* doJoin()方法用来返回当前任务的执行状态
**/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
// 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)
return (s = status) < 0 ? s :
// 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
// 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)
// tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true
// doExec()方法执行任务
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
// 如果是处于顶端并且任务执行完毕,返回结果
tryUnpush(this) && (s = doExec()) < 0 ? s :
// 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务
// awaitJoin():使用自旋使任务执行完成,返回结果
wt.pool.awaitJoin(w, this, 0L) :
// 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果
externalAwaitDone();
}

我们在之前介绍过说Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞,下面是ForkJoinPool.join()的流程图:

join流程图

RecursiveAction和RecursiveTask

通常情况下,在创建任务的时候我们一般不直接继承ForkJoinTask,而是继承它的子类RecursiveActionRecursiveTask

两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask

此外,两个子类都有执行主要计算的方法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。

ForkJoinPool

ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。

ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。

我们来大致看下ForkJoinPool的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
// 任务队列
volatile WorkQueue[] workQueues;

// 线程的运行状态
volatile int runState;

// 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写
public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;

// 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响
static final ForkJoinPool common;

// 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用
// 其他构造方法都是源自于此方法
// parallelism: 并行度,
// 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory, // 工作线程工厂
UncaughtExceptionHandler handler, // 拒绝任务的handler
int mode, // 同步模式
String workerNamePrefix) { // 线程名prefix
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}

}

WorkQueue

双端队列,ForkJoinTask存放在这里。

当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。

ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。

runState

ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。

Fork/Join的使用

上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。

下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:

1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······

如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class FibonacciTest {

class Fibonacci extends RecursiveTask<Integer> {

int n;

public Fibonacci(int n) {
this.n = n;
}

// 主要的实现逻辑都在compute()里
@Override
protected Integer compute() {
// 这里先假设 n >= 0
if (n <= 1) {
return n;
} else {
// f(n-1)
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
// f(n-2)
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
// f(n) = f(n-1) + f(n-2)
return f1.join() + f2.join();
}
}
}

@Test
public void testFib() throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());
long start = System.currentTimeMillis();
Fibonacci fibonacci = new Fibonacci(40);
Future<Integer> future = forkJoinPool.submit(fibonacci);
System.out.println(future.get());
long end = System.currentTimeMillis();
System.out.println(String.format("耗时:%d millis", end - start));
}


}

上面例子在本机的输出:

1
2
3
CPU核数:4
计算结果:102334155
耗时:9490 millis

需要注意的是,上述计算时间复杂度为O(2^n),随着n的增长计算效率会越来越低,这也是上面的例子中n不敢取太大的原因。

此外,也并不是所有的任务都适合Fork/Join框架,比如上面的例子任务划分过于细小反而体现不出效率,下面我们试试用普通的递归来求f(n)的值,看看是不是要比使用Fork/Join快:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 普通递归,复杂度为O(2^n)
public int plainRecursion(int n) {
if (n == 1 || n == 2) {
return 1;
} else {
return plainRecursion(n -1) + plainRecursion(n - 2);
}
}

@Test
public void testPlain() {
long start = System.currentTimeMillis();
int result = plainRecursion(40);
long end = System.currentTimeMillis();
System.out.println("计算结果:" + result);
System.out.println(String.format("耗时:%d millis", end -start));
}

普通递归的例子输出:

1
2
计算结果:102334155
耗时:436 millis

通过输出可以很明显的看出来,使用普通递归的效率都要比使用Fork/Join框架要高很多。

这里我们再用另一种思路来计算:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 通过循环来计算,复杂度为O(n)
private int computeFibonacci(int n) {
// 假设n >= 0
if (n <= 1) {
return n;
} else {
int first = 1;
int second = 1;
int third = 0;
for (int i = 3; i <= n; i ++) {
// 第三个数是前两个数之和
third = first + second;
// 前两个数右移
first = second;
second = third;
}
return third;
}
}

@Test
public void testComputeFibonacci() {
long start = System.currentTimeMillis();
int result = computeFibonacci(40);
long end = System.currentTimeMillis();
System.out.println("计算结果:" + result);
System.out.println(String.format("耗时:%d millis", end -start));
}

上面例子在笔者所用电脑的输出为:

1
2
计算结果:102334155
耗时:0 millis

这里耗时为0不代表没有耗时,是表明这里计算的耗时几乎可以忽略不计,大家可以在自己的电脑试试,即使是n取大很多量级的数据(注意int溢出的问题)耗时也是很短的,或者可以用System.nanoTime()统计纳秒的时间。

为什么在这里普通的递归或循环效率更快呢?因为Fork/Join是使用多个线程协作来计算的,所以会有线程通信和线程切换的开销。

如果要计算的任务比较简单(比如我们案例中的斐波那契数列),那当然是直接使用单线程会更快一些。但如果要计算的东西比较复杂,计算机又是多核的情况下,就可以充分利用多核CPU来提高计算速度。

另外,Java 8 Stream的并行操作底层就是用到了Fork/Join框架,下一章我们将从源码及案例两方面介绍Java 8 Stream的并行操作。

Java 8 Stream并行计算原理

Java 8 Stream简介

从Java 8 开始,我们可以使用Stream接口以及lambda表达式进行“流式计算”。它可以让我们对集合的操作更加简洁、更加可读、更加高效。

Stream接口有非常多用于集合计算的方法,比如判空操作empty、过滤操作filter、求最max值、查找操作findFirst和findAny等等。

Stream单线程串行计算

Stream接口默认是使用串行的方式,也就是说在一个线程里执行。下面举一个例子:

1
2
3
4
5
6
7
8
9
10
11
public class StreamDemo {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.reduce((a, b) -> {
System.out.println(String.format("%s: %d + %d = %d",
Thread.currentThread().getName(), a, b, a + b));
return a + b;
})
.ifPresent(System.out::println);
}
}

我们来理解一下这个方法。首先我们用整数1~9创建了一个Stream。这里的Stream.of(T… values)方法是Stream接口的一个静态方法,其底层调用的是Arrays.stream(T[] array)方法。

然后我们使用了reduce方法来计算这个集合的累加和。reduce方法这里做的是:从前两个元素开始,进行某种操作(我这里进行的是加法操作)后,返回一个结果,然后再拿这个结果跟第三个元素执行同样的操作,以此类推,直到最后的一个元素。

我们来打印一下当前这个reduce操作的线程以及它们被操作的元素和返回的结果以及最后所有reduce方法的结果,也就代表的是数字1到9的累加和。

main: 1 + 2 = 3
main: 3 + 3 = 6
main: 6 + 4 = 10
main: 10 + 5 = 15
main: 15 + 6 = 21
main: 21 + 7 = 28
main: 28 + 8 = 36
main: 36 + 9 = 45
45

可以看到,默认情况下,它是在一个单线程运行的,也就是main线程。然后每次reduce操作都是串行起来的,首先计算前两个数字的和,然后再往后依次计算。

Stream多线程并行计算

我们思考上面一个例子,是不是一定要在单线程里进行串行地计算呢?假如我的计算机是一个多核计算机,我们在理论上能否利用多核来进行并行计算,提高计算效率呢?

当然可以,比如我们在计算前两个元素1 + 2 = 3的时候,其实我们也可以同时在另一个核计算 3 + 4 = 7。然后等它们都计算完成之后,再计算 3 + 7 = 10的操作。

是不是很熟悉这样的操作手法?没错,它就是ForkJoin框架的思想。下面小小地修改一下上面的代码,增加一行代码,使Stream使用多线程来并行计算:

1
2
3
4
5
6
7
8
9
10
11
12
public class StreamParallelDemo {
public static void main(String[] args) {
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9)
.parallel()
.reduce((a, b) -> {
System.out.println(String.format("%s: %d + %d = %d",
Thread.currentThread().getName(), a, b, a + b));
return a + b;
})
.ifPresent(System.out::println);
}
}

可以看到,与上一个案例的代码只有一点点区别,就是在reduce方法被调用之前,调用了parallel()方法。下面来看看这个方法的输出:

ForkJoinPool.commonPool-worker-1: 3 + 4 = 7
ForkJoinPool.commonPool-worker-4: 8 + 9 = 17
ForkJoinPool.commonPool-worker-2: 5 + 6 = 11
ForkJoinPool.commonPool-worker-3: 1 + 2 = 3
ForkJoinPool.commonPool-worker-4: 7 + 17 = 24
ForkJoinPool.commonPool-worker-4: 11 + 24 = 35
ForkJoinPool.commonPool-worker-3: 3 + 7 = 10
ForkJoinPool.commonPool-worker-3: 10 + 35 = 45
45

可以很明显地看到,它使用的线程是ForkJoinPool里面的commonPool里面的worker线程。并且它们是并行计算的,并不是串行计算的。但由于Fork/Join框架的作用,它最终能很好的协调计算结果,使得计算结果完全正确。

如果我们用Fork/Join代码去实现这样一个功能,那无疑是非常复杂的。但Java8提供了并行式的流式计算,大大简化了我们的代码量,使得我们只需要写很少很简单的代码就可以利用计算机底层的多核资源。

从源码看Stream并行计算原理

上面我们通过在控制台输出线程的名字,看到了Stream的并行计算底层其实是使用的Fork/Join框架。那它到底是在哪使用Fork/Join的呢?我们从源码上来解析一下上述案例。

Stream.of方法就不说了,它只是生成一个简单的Stream。先来看看parallel()方法的源码。这里由于我的数据是int类型的,所以它其实是使用的BaseStream接口的parallel()方法。而BaseStream接口的JDK唯一实现类是一个叫AbstractPipeline的类。下面我们来看看这个类的parallel()方法的代码:

1
2
3
4
public final S parallel() {
sourceStage.parallel = true;
return (S) this;
}

这个方法很简单,就是把一个标识sourceStage.parallel设置为true。然后返回实例本身。

接着我们再来看reduce这个方法的内部实现。

Stream.reduce()方法的具体实现是交给了ReferencePipeline这个抽象类,它是继承了AbstractPipeline这个类的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// ReferencePipeline抽象类的reduce方法
@Override
public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
// 调用evaluate方法
return evaluate(ReduceOps.makeRef(accumulator));
}

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;

return isParallel() // 调用isParallel()判断是否使用并行模式
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

@Override
public final boolean isParallel() {
// 根据之前在parallel()方法设置的那个flag来判断。
return sourceStage.parallel;
}

从它的源码可以知道,reduce方法调用了evaluate方法,而evaluate方法会先去检查当前的flag,是否使用并行模式,如果是则会调用evaluateParallel方法执行并行计算,否则,会调用evaluateSequential方法执行串行计算。

这里我们再看看TerminalOp(注意这里是字母l O,而不是数字1 0)接口的evaluateParallel方法。TerminalOp接口的实现类有这样几个内部类:

  • java.util.stream.FindOps.FindOp
  • java.util.stream.ForEachOps.ForEachOp
  • java.util.stream.MatchOps.MatchOp
  • java.util.stream.ReduceOps.ReduceOp

可以看到,对应的是Stream的几种主要的计算操作。我们这里的示例代码使用的是reduce计算,那我们就看看ReduceOp类的这个方法的源码:

1
2
3
4
5
6
// java.util.stream.ReduceOps.ReduceOp.evaluateParallel
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}

evaluateParallel方法创建了一个新的ReduceTask实例,并且调用了invoke()方法后再调用get()方法,然后返回这个结果。那这个ReduceTask是什么呢?它的invoke方法内部又是什么呢?

追溯源码我们可以发现,ReduceTask类是ReduceOps类的一个内部类,它继承了AbstractTask类,而AbstractTask类又继承了CountedCompleter类,而CountedCompleter类又继承了ForkJoinTask类!

它们的继承关系如下:

ReduceTask -> AbstractTask -> CountedCompleter -> ForkJoinTask

这里的ReduceTask的invoke方法,其实是调用的ForkJoinTask的invoke方法,中间三层继承并没有覆盖这个方法的实现。

所以这就从源码层面解释了Stream并行的底层原理是使用了Fork/Join框架。

需要注意的是,一个Java进程的Stream并行计算任务默认共享同一个线程池,如果随意的使用并行特性可能会导致方法的吞吐量下降。我们可以通过下面这种方式来让你的某个并行Stream使用自定义的ForkJoin线程池:

1
2
3
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool
.submit(() -> roster.parallelStream().reduce(0, Integer::sum)).get();

Stream并行计算的性能提升

我们可以在本地测试一下如果在多核情况下,Stream并行计算会给我们的程序带来多大的效率上的提升。用以下示例代码来计算一千万个随机数的和:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class StreamParallelDemo {
public static void main(String[] args) {
System.out.println(String.format("本计算机的核数:%d", Runtime.getRuntime().availableProcessors()));

// 产生100w个随机数(1 ~ 100),组成列表
Random random = new Random();
List<Integer> list = new ArrayList<>(1000_0000);

for (int i = 0; i < 1000_0000; i++) {
list.add(random.nextInt(100));
}

long prevTime = getCurrentTime();
list.stream().reduce((a, b) -> a + b).ifPresent(System.out::println);
System.out.println(String.format("单线程计算耗时:%d", getCurrentTime() - prevTime));

prevTime = getCurrentTime();
list.stream().parallel().reduce((a, b) -> a + b).ifPresent(System.out::println);
System.out.println(String.format("多线程计算耗时:%d", getCurrentTime() - prevTime));

}

private static long getCurrentTime() {
return System.currentTimeMillis();
}
}

输出:

本计算机的核数:8
495156156
单线程计算耗时:223
495156156
多线程计算耗时:95

所以在多核的情况下,使用Stream的并行计算确实比串行计算能带来很大效率上的提升,并且也能保证结果计算完全准确。

本文一直在强调的“多核”的情况。其实可以看到,我的本地电脑有8核,但并行计算耗时并不是单线程计算耗时除以8,因为线程的创建、销毁以及维护线程上下文的切换等等都有一定的开销。所以如果你的服务器并不是多核服务器,那也没必要用Stream的并行计算。因为在单核的情况下,往往Stream的串行计算比并行计算更快,因为它不需要线程切换的开销。

计划任务

自JDK 1.5 开始,JDK提供了ScheduledThreadPoolExecutor类用于计划任务(又称定时任务),这个类有两个用途:

  • 在给定的延迟之后运行任务
  • 周期性重复执行任务

在这之前,是使用Timer类来完成定时任务的,但是Timer有缺陷:

  • Timer是单线程模式;
  • 如果在执行任务期间某个TimerTask耗时较久,那么就会影响其它任务的调度;
  • Timer的任务调度是基于绝对时间的,对系统时间敏感;
  • Timer不会捕获执行TimerTask时所抛出的异常,由于Timer是单线程,所以一旦出现异常,则线程就会终止,其他任务也得不到执行。

所以JDK 1.5之后,大家就摒弃Timer,使用ScheduledThreadPoolExecutor吧。

使用案例

假设我有一个需求,指定时间给大家发送消息。那么我们会将消息(包含发送时间)存储在数据库中,然后想用一个定时任务,每隔1秒检查数据库在当前时间有没有需要发送的消息,那这个计划任务怎么写?下面是一个Demo:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ThreadPool {

private static final ScheduledExecutorService executor = new
ScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());

private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args){
// 新建一个固定延迟时间的计划任务
executor.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
if (haveMsgAtCurrentTime()) {
System.out.println(df.format(new Date()));
System.out.println("大家注意了,我要发消息了");
}
}
}, 1, 1, TimeUnit.SECONDS);
}

public static boolean haveMsgAtCurrentTime(){
//查询数据库,有没有当前时间需要发送的消息
//这里省略实现,直接返回true
return true;
}
}

下面截取前面的输出(这个demo会一直运行下去):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
2019-01-23 16:16:48
大家注意了,我要发消息了
2019-01-23 16:16:49
大家注意了,我要发消息了
2019-01-23 16:16:50
大家注意了,我要发消息了
2019-01-23 16:16:51
大家注意了,我要发消息了
2019-01-23 16:16:52
大家注意了,我要发消息了
2019-01-23 16:16:53
大家注意了,我要发消息了
2019-01-23 16:16:54
大家注意了,我要发消息了
2019-01-23 16:16:55
大家注意了,我要发消息了

这就是ScheduledThreadPoolExecutor的一个简单运用,想要知道奥秘,接下来的东西需要仔细的看哦。

类结构

1
2
3
4
5
6
7
8
9
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService {

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
//……
}

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,实现了ScheduledExecutorService。 线程池在之前的章节介绍过了,我们先看看ScheduledExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public interface ScheduledExecutorService extends ExecutorService {

public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);

public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}

ScheduledExecutorService实现了ExecutorService ,并增加若干定时相关的接口。 前两个方法用于单次调度执行任务,区别是有没有返回值。

重点理解一下后面两个方法:

  • scheduleAtFixedRate

    该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。注意,period是从任务开始执行算起的。开始执行任务后,定时器每隔period时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才再次启动任务。

  • scheduleWithFixDelay

    该方法在initialDelay时长后第一次执行任务,以后每当任务执行完成后,等待delay时长,再次执行任务。

主要方法介绍

schedule

1
2
3
4
5
6
7
8
9
10
11
// delay时长后执行任务command,该任务只执行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 这里的decorateTask方法仅仅返回第二个参数
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null, triggerTime(delay,unit)));
// 延时或者周期执行任务的主要方法,稍后统一说明
delayedExecute(t);
return t;
}

我们先看看里面涉及到的几个类和接口ScheduledFutureRunnableScheduledFutureScheduledFutureTask的关系:

类图

我们先看看这几个接口和类:

Delayed接口

1
2
3
4
5
// 继承Comparable接口,表示该类对象支持排序
public interface Delayed extends Comparable<Delayed> {
// 返回该对象剩余时延
long getDelay(TimeUnit unit);
}

Delayed接口很简单,继承了Comparable接口,表示对象是可以比较排序的。

ScheduledFuture接口

1
2
3
// 仅仅继承了Delayed和Future接口,自己没有任何代码
public interface ScheduledFuture<V> extends Delayed, Future<V> {
}

没有添加其他方法。

RunnableScheduledFuture接口

1
2
3
4
public interface RunnableScheduledFuture<V> extends RunnableFuture<V>, ScheduledFuture<V> {      
// 是否是周期任务,周期任务可被调度运行多次,非周期任务只被运行一次
boolean isPeriodic();
}

ScheduledFutureTask类

回到schecule方法中,它创建了一个ScheduledFutureTask的对象,由上面的关系图可知,ScheduledFutureTask直接或者间接实现了很多接口,一起看看ScheduledFutureTask里面的实现方法吧。

构造方法

1
2
3
4
5
6
7
8
9
10
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
// 调用父类FutureTask的构造方法
super(r, result);
// time表示任务下次执行的时间
this.time = ns;
// 周期任务,正数表示按照固定速率,负数表示按照固定时延,0表示不是周期任务
this.period = period;
// 任务的编号
this.sequenceNumber = sequencer.getAndIncrement();
}

Delayed接口的实现

1
2
3
4
// 实现Delayed接口的getDelay方法,返回任务开始执行的剩余时间
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}

Comparable接口的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// Comparable接口的compareTo方法,比较两个任务的”大小”。
public int compareTo(Delayed other) {
if (other == this)
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
// 小于0,说明当前任务的执行时间点早于other,要排在延时队列other的前面
if (diff < 0)
return -1;
// 大于0,说明当前任务的执行时间点晚于other,要排在延时队列other的后面
else if (diff > 0)
return 1;
// 如果两个任务的执行时间点一样,比较两个任务的编号,编号小的排在队列前面,编号大的排在队列后面
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
// 如果任务类型不是ScheduledFutureTask,通过getDelay方法比较
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}

setNextRunTime

1
2
3
4
5
6
7
8
9
10
11
12
// 任务执行完后,设置下次执行的时间
private void setNextRunTime() {
long p = period;
// p > 0,说明是固定速率运行的任务
// 在原来任务开始执行时间的基础上加上p即可
if (p > 0)
time += p;
// p < 0,说明是固定时延运行的任务,
// 下次执行时间在当前时间(任务执行完成的时间)的基础上加上-p的时间
else
time = triggerTime(-p);
}

Runnable接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void run() {
boolean periodic = isPeriodic();
// 如果当前状态下不能执行任务,则取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 不是周期性任务,执行一次任务即可,调用父类的run方法
else if (!periodic)
ScheduledFutureTask.super.run();
// 是周期性任务,调用FutureTask的runAndReset方法,方法执行完成后
// 重新设置任务下一次执行的时间,并将该任务重新入队,等待再次被调度
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}

总结一下run方法的执行过程:

  1. 如果当前线程池运行状态不可以执行任务,取消该任务,然后直接返回,否则执行步骤2;
  2. 如果不是周期性任务,调用FutureTask中的run方法执行,会设置执行结果,然后直接返回,否则执行步骤3;
  3. 如果是周期性任务,调用FutureTask中的runAndReset方法执行,不会设置执行结果,然后直接返回,否则执行步骤4和步骤5;
  4. 计算下次执行该任务的具体时间;
  5. 重复执行任务。

runAndReset方法是为任务多次执行而设计的。runAndReset方法执行完任务后不会设置任务的执行结果,也不会去更新任务的状态,维持任务的状态为初始状态(NEW状态),这也是该方法和FutureTaskrun方法的区别。

scheduledAtFixedRate

我们看一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// 注意,固定速率和固定时延,传入的参数都是Runnable,也就是说这种定时任务是没有返回值的
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
// 创建一个有初始延时和固定周期的任务
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
// outerTask表示将会重新入队的任务
sft.outerTask = t;
// 稍后说明
delayedExecute(t);
return t;
}

scheduleAtFixedRate这个方法和schedule类似,不同点是scheduleAtFixedRate方法内部创建的是ScheduledFutureTask,带有初始延时和固定周期的任务 。

scheduledAtFixedDelay

FixedDelay也是通过ScheduledFutureTask体现的,唯一不同的地方在于创建的ScheduledFutureTask不同 。这里不再展示源码。

delayedExecute

前面讲到的schedulescheduleAtFixedRatescheduleAtFixedDelay最后都调用了delayedExecute方法,该方法是定时任务执行的主要方法。 一起来看看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 线程池已经关闭,调用拒绝执行处理器处理
if (isShutdown())
reject(task);
else {
// 将任务加入到等待队列
super.getQueue().add(task);
// 线程池已经关闭,且当前状态不能运行该任务,将该任务从等待队列移除并取消该任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 增加一个worker,就算corePoolSize=0也要增加一个worker
ensurePrestart();
}
}

delayedExecute方法的逻辑也很简单,主要就是将任务添加到等待队列,然后调用ensurePrestart方法。

1
2
3
4
5
6
7
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}

ensurePrestart方法主要是调用了addWorker,线程池中的工作线程是通过该方法来启动并执行任务的。 具体可以查看前面讲的线程池章节。

对于ScheduledThreadPoolExecutorworker添加到线程池后会在等待队列上等待获取任务,这点是和ThreadPoolExecutor一致的。但是worker是怎么从等待队列取定时任务的?

因为ScheduledThreadPoolExecutor使用了DelayedWorkQueue保存等待的任务,该等待队列队首应该保存的是最近将要执行的任务,如果队首任务的开始执行时间还未到,worker也应该继续等待。

DelayedWorkQueue

ScheduledThreadPoolExecutor使用了DelayedWorkQueue保存等待的任务。

该等待队列队首应该保存的是最近将要执行的任务,所以worker只关心队首任务即可,如果队首任务的开始执行时间还未到,worker也应该继续等待。

DelayedWorkQueue是一个无界优先队列,使用数组存储,底层是使用堆结构来实现优先队列的功能。我们先看看DelayedWorkQueue的声明和成员变量:

1
2
3
4
5
6
7
8
9
10
11
12
13
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
// 队列初始容量
private static final int INITIAL_CAPACITY = 16;
// 数组用来存储定时任务,通过数组实现堆排序
private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];
// 当前在队首等待的线程
private Thread leader = null;
// 锁和监视器,用于leader线程
private final ReentrantLock lock = new ReentrantLock();
private final Condition available = lock.newCondition();
// 其他代码,略
}

当一个线程成为leader,它只要等待队首任务的delay时间即可,其他线程会无条件等待。leader取到任务返回前要通知其他线程,直到有线程成为新的leader。每当队首的定时任务被其他更早需要执行的任务替换时,leader设置为null,其他等待的线程(被当前leader通知)和当前的leader重新竞争成为leader。

同时,定义了锁lock和监视器available用于线程竞争成为leader。

当一个新的任务成为队首,或者需要有新的线程成为leader时,available监视器上的线程将会被通知,然后竞争成为leader线程。 有些类似于生产者-消费者模式。

接下来看看DelayedWorkQueue中几个比较重要的方法

take

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 取堆顶的任务,堆顶是最近要执行的任务
RunnableScheduledFuture first = queue[0];
// 堆顶为空,线程要在条件available上等待
if (first == null)
available.await();
else {
// 堆顶任务还要多长时间才能执行
long delay = first.getDelay(TimeUnit.NANOSECONDS);
// 堆顶任务已经可以执行了,finishPoll会重新调整堆,使其满足最小堆特性,该方法设置任务在
// 堆中的index为-1并返回该任务
if (delay <= 0)
return finishPoll(first);
// 如果leader不为空,说明已经有线程成为leader并等待堆顶任务
// 到达执行时间,此时,其他线程都需要在available条件上等待
else if (leader != null)
available.await();
else {
// leader为空,当前线程成为新的leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 当前线程已经成为leader了,只需要等待堆顶任务到达执行时间即可
available.awaitNanos(delay);
} finally {
// 返回堆顶元素之前将leader设置为空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 通知其他在available条件等待的线程,这些线程可以去竞争成为新的leader
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}

take方法是什么时候调用的呢?在线程池的章节中,介绍了getTask方法,工作线程会循环地从workQueue中取任务。但计划任务却不同,因为如果一旦getTask方法取出了任务就开始执行了,而这时可能还没有到执行的时间,所以在take方法中,要保证只有在到指定的执行时间的时候任务才可以被取走。

总结一下流程:

  1. 如果堆顶元素为空,在available条件上等待。
  2. 如果堆顶任务的执行时间已到,将堆顶元素替换为堆的最后一个元素并调整堆使其满足最小堆特性,同时设置任务在堆中索引为-1,返回该任务。
  3. 如果leader不为空,说明已经有线程成为leader了,其他线程都要在available监视器上等待。
  4. 如果leader为空,当前线程成为新的leader,并等待直到堆顶任务执行时间到达。
  5. take方法返回之前,将leader设置为空,并通知其他线程。

再来说一下leader的作用,这里的leader是为了减少不必要的定时等待,当一个线程成为leader时,它只等待下一个节点的时间间隔,但其它线程无限期等待。 leader线程必须在从take()poll()返回之前signal其它线程,除非其他线程成为了leader。

举例来说,如果没有leader,那么在执行take时,都要执行available.awaitNanos(delay),假设当前线程执行了该段代码,这时还没有signal,第二个线程也执行了该段代码,则第二个线程也要被阻塞。但只有一个线程返回队首任务,其他的线程在awaitNanos(delay)之后,继续执行for循环,因为队首任务已经被返回了,所以这个时候的for循环拿到的队首任务是新的,又需要重新判断时间,又要继续阻塞。

所以,为了不让多个线程频繁的做无用的定时等待,这里增加了leader,如果leader不为空,则说明队列中第一个节点已经在等待出队,这时其它的线程会一直阻塞,减少了无用的阻塞(注意,在finally中调用了signal()来唤醒一个线程,而不是signalAll())。

offer

该方法往队列插入一个值,返回是否成功插入 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 队列元素已经大于等于数组的长度,需要扩容,新堆的容量是原来堆容量的1.5倍
if (i >= queue.length)
grow();
// 堆中元素增加1
size = i + 1;
// 调整堆
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// 调整堆,使的满足最小堆,比较大小的方式就是上文提到的compareTo方法
siftUp(i, e);
}
if (queue[0] == e) {
leader = null;
// 通知其他在available条件上等待的线程,这些线程可以竞争成为新的leader
available.signal();
}
} finally {
lock.unlock();
}
return true;
}

在堆中插入了一个节点,这个时候堆有可能不满足最小堆的定义,siftUp用于将堆调整为最小堆,这属于数据结构的基本内容,本文不做介绍。

总结

内部使用优化的DelayQueue来实现,由于使用队列来实现定时器,有出入队调整堆等操作,所以定时并不是非常非常精确。